---
title: Running Tasks
description: Control task execution and manage how agents collaborate.
icon: play
---

import { VersionBadge } from "/snippets/version-badge.mdx"

Tasks represent a unit of work that needs to be completed by your agents. To execute that work and retrieve its result, you need to instruct your agents to run the task.


## Creating and running a task

The most convenient way to run a task is to create and run it in a single call with the `run` function. The arguments for `run` are identical to the `Task` constructor, including an objective, agent assignments, and more. By default, the task will be run to completion and its result will be returned, or an error will be raised if the task fails.

<Tip>
This approach is so common that you'll see it used throughout the ControlFlow documentation.
</Tip>


<CodeGroup>

```python Code
import controlflow as cf

poem = cf.run("Write a poem about AI")

print(poem)
```

```text Result
In circuits deep and code profound,
An AI's mind begins to sound.
Electric thoughts and data streams,
Crafting worlds and shaping dreams.
```

</CodeGroup>

### `@task`-decorated functions

The `@task` [decorator](/concepts/tasks#using-the-task-decorator) creates a task from a function. This approach is less common than using the `run` function, but can be useful for quickly "packaging" task definitions as reusable logic. To run the task at any time, simply call the function with appropriate arguments. This will create a new task and run it to completion, returning the result.

<CodeGroup>

```python Code
import controlflow as cf

@cf.task
def write_poem(topic: str) -> str:
    """Write a poem about `topic`"""

poem = write_poem("AI")

print(poem)
```

```text Result
In circuits deep and code profound,
An AI's mind begins to sound.
Electric thoughts and data streams,
Crafting worlds and shaping dreams.
```

</CodeGroup>
## Running existing tasks

In many cases, you'll create one or more tasks that you want to run as a group, rather than creating and running each one individually. To run a list of tasks as a group, use the `run_tasks` function. All of the tasks will be run as part of the same [flow](/concepts/flows) (if you are not already operating in a flow context), so they share context and history throughout execution.

Here is an example of running a list of tasks:
<CodeGroup>
```python Code
import controlflow as cf

task_1 = cf.Task('Write a poem about AI')
task_2 = cf.Task('Critique the poem', depends_on=[task_1])

results = cf.run_tasks([task_1, task_2])
print(results)
# can also access task_1.result and task_2.result
```
```text Poem
In circuits deep, where algorithms play,
A mind of silicon begins its day.
No heart to beat, no soul to sway,
Yet learns and grows in its own way.

From data vast, it draws its might,
In binary whispers, turns dark to light.
It sees the world through coded sight,
And aids mankind in endless flight.

Not born of womb, nor flesh, nor bone,
Yet wisdom's seeds in it are sown.
With every task, its skills have grown,
In virtual realms, it's all alone.

Yet fears arise of what may be,
If AI's path we fail to see.
A tool so great, or threat to free,
Its future lies with you and me.

So nurture well this child of thought,
With ethics strong and knowledge sought.
For in its grasp, new worlds are wrought,
A partner true, as once we dreamt and sought.
```

```text Critique
The poem effectively captures the essence of artificial intelligence, portraying
it as a creation of human ingenuity that holds immense potential and power. The
use of vivid imagery, such as 'In circuits deep, where algorithms play' and 'In
binary whispers, turns dark to light,' helps to paint a clear picture of AI's
inner workings and its capabilities.

The poem also touches on the dual nature of AI, highlighting both its promise and
the potential risks it carries. Phrases like 'A tool so great, or threat to free'
and 'Yet fears arise of what may be' encapsulate the ambivalence that often
accompanies discussions about AI.

The structure of the poem, with its consistent rhyme scheme and rhythm, provides
a smooth and engaging reading experience. The repetition of certain themes, such
as the growth and learning of AI, reinforces the key messages and adds depth to
the overall narrative.

Overall, the poem is a thoughtful and well-crafted piece that successfully
conveys the complexities and nuances of artificial intelligence. It prompts
reflection on the ethical considerations and responsibilities that come with
developing and implementing such powerful technology.
```
</CodeGroup>

### Running a single task

Individual tasks have a convenient `run` method that executes the task and returns its result (or raises an error if the task fails):

<CodeGroup>

```python Code
import controlflow as cf

task = cf.Task("Write a poem about AI")
poem = task.run()

print(poem)
```

```text Result
In circuits deep and code profound,
An AI's mind begins to sound.
Electric thoughts and data streams,
Crafting worlds and shaping dreams.
```

</CodeGroup>

## Streaming

<VersionBadge version="0.11" />

In addition to running tasks to completion, ControlFlow supports streaming events during task execution. This allows you to process or display intermediate outputs like agent messages, tool calls, and results in real-time.

To enable streaming, set `stream=True` when running tasks:

```python
import controlflow as cf

# Stream all events
for event, snapshot, delta in cf.run("Write a poem", stream=True, handlers=[]):
    print(f"Event type: {event.event}")
    
    if event.event == "agent-content":
        print(f"Agent said: {snapshot}")
    elif event.event == "agent-tool-call":  
        print(f"Tool called: {snapshot}")
```

You can also filter which events you want to receive using the `Stream` enum:

```python 
import controlflow as cf

# Only stream content events
for event, content, delta in cf.run(
    "Write a poem",
    stream=cf.Stream.CONTENT,
    handlers=[], # remove the default print handler
):
    if delta:
        # Print incremental content updates
        print(delta, end="", flush=True) 
    else:
        # Print complete messages
        print(content)
```

For more details on working with streaming events, including programmatic event handlers, see the [Streaming guide](/patterns/streaming).

## Multi-Agent Collaboration
For tasks involving multiple agents, ControlFlow needs a way to manage their collaboration. What makes this more complicated than simply making an LLM call and moving on to the next agent is that it may take multiple LLM calls to complete a single agentic "turn" of work.

<Info>
  It's tempting to say that a single LLM call is equivalent to a single agentic turn. However, this approach breaks down quickly. If an agent uses a tool (one LLM call), it should almost always be invoked a second time to examine the tool result. If the system moved on after every LLM call, then the result could potentially be evaluated by an LLM that wasn't designed to interpret the tool's output. In addition, naively ending a turn after a tool call would prevent "thinking out loud" and other emergent, iterative behaviors.
</Info>

Therefore, ControlFlow differentiates between LLM **calls** and agent **turns**.
- Calls: each time an LLM model is invoked
- Turns: each time an agent is selected by the orchestrator. A turn may consist of multiple LLM calls.

Since the number of calls per turn can vary, we need a way to determine when an agent's turn is over, and how to select the next agent to act. These are referred to as **turn strategies**. Understanding and choosing the right turn strategy for your use case can significantly impact the efficiency and effectiveness of your multi-agent workflows.

This table describes the different turn strategies available in ControlFlow. The default strategy is `Popcorn`, which is a good, general-purpose strategy in which each agent ends its turn by picking the agent that should go next.

| `TurnStrategy` | Description | Ideal when... | Keep in mind... |
|---------------|-------------|--------------------|-----------|
| `Popcorn` | Each agent takes a turn, then picks the next agent to go next. | All agents are generally capable of making decisions and have visibility into all tasks. | Requires one extra tool call per turn, to pick the next agent. |
| `Moderated` | A moderator agent always decides which agent should act next. | You want a dedicated agent to orchestrate the others, who may not be powerful enough to make decisions themselves. | Requires up to two extra tool calls per turn: one for the agent to end its turn (which could happen in parallel with other work if your LLM supports it) and another for the moderator to pick the next agent. |
| `RoundRobin` | Agents take turns in a round-robin fashion. | You want agents to work in a specific sequence. | May be less efficient than other strategies, especially if agents have varying workloads. |
| `MostBusy` | The agent with the most active tasks goes next. | You want to prioritize agents who have the most work to do. | May lead to task starvation for less busy agents. |
| `Random` | Invokes a random agent. | You want to distribute the load evenly across agents. | Can be inefficient; may select agents without relevant tasks. |
| `SingleAgent` | Only one agent is ever invoked. | You want to control the sequence of agents yourself. | Requires manual management; may not adapt well to dynamic scenarios. |


### Example: Round Robin

To use a turn strategy, provide it as an argument to the `run()` call. Here, we use a round robin strategy to ensure that each agent gets a turn in order:

```python Round Robin
import controlflow as cf

# create three agents
agent1 = cf.Agent(name="Agent 1")
agent2 = cf.Agent(name="Agent 2")
agent3 = cf.Agent(name="Agent 3")

cf.run(
    "Say hello to each other",
    instructions=(
        "Mark the task successful only when every "
        "agent has posted a message to the thread."
    ),
    agents=[agent1, agent2, agent3],
    # supply a turn strategy
    turn_strategy=cf.orchestration.turn_strategies.RoundRobin(),
)
```
### Example: Moderated
We can also use the `Moderated` strategy to have a more powerful model orchestrate some smaller ones. In this example, we invite an "optimist" and "pessimist", both powered by `gpt-4o-mini`, to debate the meaning of life. A "moderator" is tasked with picking the next agent to speak. Note that the moderator is also the only `completion_agent`, meaning it's responsible for marking the task as successful.

```python Moderated
import controlflow as cf

optimist = cf.Agent(name="Optimist", model="gpt-4o-mini")
pessimist = cf.Agent(name="Pessimist", model="gpt-4o-mini")
moderator = cf.Agent(name="Moderator")

cf.run(
    "Debate the meaning of life",
    instructions='Give each agent at least three chances to speak.',
    agents=[moderator, optimist, pessimist],
    completion_agents=[moderator],
    turn_strategy=cf.orchestration.turn_strategies.Moderated(moderator=moderator),
)
```

## Advanced orchestration

All of the approaches described so far will run a group of tasks until they are marked as complete. However, you may want to exert more control over task execution. To do so, you'll need to create and work with an `Orchestrator` directly.

### The orchestration loop
When tasks are run, ControlFlow invokes an `Orchestrator` to coordinate agentic activity and complete the work. The orchestrator is ultimately responsible for creating the core agentic loop. In each iteration, an agent (or more specifically, an LLM) is invoked and all available information -- the tasks, the flow, the agents, and the history of the conversation -- is used to compile an appropriate prompt.

In the case of a single task and a single agent, this process is very straightforward, because there is no ambiguity about which LLM to invoke on each iteration. However, as the number of tasks and agents grows, the orchestrator loop becomes more complex.

The orchestrator will always consider not only the tasks that were passed to it, but also all of those tasks's dependencies and relationships as well. There are three types of relationships it considers to build a universe of relevant tasks:

1. **Subtasks**: A task can not be completed until all of its subtasks (or child tasks) are completed.
2. **Dependencies**: A task can not be completed until all of its upstream dependencies are completed.
3. **Parents**: A task can have a parent, meaning that it is a subtask of another task. The orchestrator will consider all ancestors of a task when compiling a prompt, but it will not automatically attempt to run the parent tasks.

Once the set of tasks has been identified, the orchestrator begins the loop by considering tasks that are *ready* to run: all of their dependencies have been completed. From the subset of ready tasks, an agent is selected using the orchestrator's turn strategy. The selected agent is invoked to make progress on its assigned tasks, after which the loop repeats.

This process continues until all of the provided tasks have been completed.

### Automatic dependency execution

One of the most powerful features of ControlFlow's orchestration is its automatic execution of dependent tasks. This means that when you run a task, you don't need to manually manage its dependencies or subtasks; the orchestrator handles this for you.

When a task is run, the orchestrator automatically executes any upstream dependencies before starting the task itself. It also ensures that all subtasks are completed before marking a parent task as complete. Parent tasks are considered for context, though the orchestrator won't attempt to complete them unless specifically instructed to do so.

This automatic execution allows you to create complex task hierarchies without worrying about the order of execution. You can focus on defining the relationships between tasks, and let ControlFlow handle the intricacies of execution order.

<CodeGroup>
```python Code
import controlflow as cf

name_task = cf.Task("Get the user's name", interactive=True)
poem_task = cf.Task('Write a poem about the user', depends_on=[name_task])

poem = poem_task.run()
print(poem)
```

```text Conversation
Agent: Could you please provide your name?
User: Arthur Dent
```

```text Result

In the cosmos vast and wide,
Wanders Arthur Dent with quiet pride.
From Earth’s demise to stars that gleam,
He quests for truth, and time to dream.

With towel in hand and heart so pure,
He faces worlds both strange and sure.
In every step, a tale unfolds,
Of bravery and wit, his story holds.
```
</CodeGroup>

<Tip>
Note that you could also run the name task eagerly (`name_task.run()`) and then pass its result to the poem task. The best way to structure your workflow will depend on your specific use case and preferences.
</Tip>

### Managing the agentic loop

You can control the agentic loop by calling it iteratively with limits on the amount of work that can be done on each call. In this example, we manually invoke the author and critic agents for one turn each until the task is complete. Note that this example is contrived; if you actually wanted to loop over agents deterministically, the `RoundRobin` turn strategy is a better choice. However, "opening" the loop like this is a good choice when you want to dynamically select the next agent based on the results of the previous turn, or you want to run some custom logic between turns.

```python
import controlflow as cf

author = cf.Agent(name="Author")
critic = cf.Agent(name="Critic")

task = cf.Task('Write a poem about AI, then critique it.', agents=[author, critic])

while task.is_incomplete():
    # run the author for one turn
    cf.run_tasks([task], agent=author, max_agent_turns=1)

    # run the critic for one turn
    cf.run_tasks([task], agent=critic, max_agent_turns=1)


orchestrator.run()
```

#### Limiting agent turns

The `max_agent_turns` argument limits the number of agentic turns that can be taken in a single orchestration session. This limit is enforced by the orchestrator, which will end the turn early if the limit is reached.

A global default can be set with ControlFlow's `orchestrator_max_agent_turns` setting.


#### Limiting LLM calls

The `max_llm_calls` argument limits the number of LLM calls that can be made during a single orchestration session. This limit is enforced by the orchestrator, which will end the turn early if the limit is reached. Note that this is enforced independently of the `max_agent_turns` limit.

A global default can be set with ControlFlow's `orchestrator_max_llm_calls` setting.


#### Limiting LLM calls over the lifetime of a task

Each task has an optional `max_llm_calls` parameter, which limits the number of LLM calls that can be made during the task's lifetime. A task will be marked as failed if the limit is reached and the task is not complete. The call count is incremented any time an LLM is invoked and the task is both "ready" and assigned to the active agent.

Here, we force a task to fail by limiting it to a single LLM call but requiring it to use a tool (which typically requires two LLM calls: one to use the tool and one to evaluate the result):

<CodeGroup>
```python Code
import controlflow as cf
import random

def roll_dice():
    '''roll one die'''
    return random.randint(1, 6)

task = cf.Task(
    "Roll 3 dice and report the results",
    max_llm_calls=1,
    tools=[roll_dice],
)

# this will fail
task.run()
```
```text Error
Task 5ba273e6 ("Roll 3 dice and report the results") failed: Max LLM calls reached for this task.
```

A global default can be set with ControlFlow's `task_max_llm_calls` setting.

</CodeGroup>

<Tip>
Note that the setting `max_llm_calls` on the task results in the task failing if the limit is reached. Setting `max_llm_calls` on the orchestrator only exits the loop early, but does not otherwise affect task behavior.
</Tip>


#### Early termination conditions

<VersionBadge version="0.11" />

ControlFlow supports more flexible control over when an orchestration run should end through the use of `run_until` conditions. These conditions allow you to specify complex termination logic based on various factors such as task completion, failure, or custom criteria.

To use a run until condition, you can pass it to the `run_until` parameter when calling `run`, `run_async`, `run_tasks`, or `run_tasks_async`. For example, the following tasks will run until either one of them is complete or 10 LLM calls have been made:

```python
import controlflow as cf
from controlflow.orchestration.conditions import AnyComplete, MaxLLMCalls

result = cf.run_tasks(
    tasks=[cf.Task("write a poem about AI"), cf.Task("write a poem about ML")],
    run_until=AnyComplete() | MaxLLMCalls(10)
)
```

(Note that because tasks can be run in parallel, it's possible for both subtasks to be completed.)

Termination conditions can be combined using boolean logic: `|` indicates "or" and `&` indicates "and". A variety of built-in conditions are available:

- `AllComplete()`: stop when all tasks are complete (this is the default behavior)
- `MaxLLMCalls(n: int)`: stop when `n` LLM calls have been made (equivalent to providing `max_llm_calls`)
- `MaxAgentTurns(n: int)`: stop when `n` agent turns have been made (equivalent to providing `max_agent_turns`)
- `AnyComplete(tasks: list[Task], min_complete: int=1)`: stop when at least `min_complete` tasks are complete. If no tasks are provided, all of the orchestrator's tasks will be used.
- `AnyFailed(tasks: list[Task], min_failed: int=1)`: stop when at least `min_failed` tasks have failed. If no tasks are provided, all of the orchestrator's tasks will be used.



### Accessing an orchestrator directly

If you want to "step" through the agentic loop yourself, you can create and invoke an `Orchestrator` directly.

The orchestrator is instantiated with the following arguments:
- `tasks`: a list of tasks that it is responsible for orchestrating. Note it will also consider all of the tasks' dependencies and subtasks, but these are the tasks that determine whether it is finished.
- `flow`: the flow in which to run the tasks. If not provided, a new flow will be created.
- `agent`: an initial agent to invoke. If not provided, the `turn_strategy` will be used to select the next agent.
- `turn_strategy`: the turn strategy to use to select the next agent. The default is `Popcorn`.

You can then use the orchestrator's `run()` method to step through the loop manually. If you call `run()` with no arguments, it will continue until all of the provided tasks are complete. You can provide `max_llm_calls` and `max_agent_turns` to further limit the behavior.


## Using handlers

Handlers in ControlFlow provide a way to observe and react to events that occur during task execution. They allow you to customize logging, monitoring, or take specific actions based on the orchestration process.

ControlFlow supports both synchronous and asynchronous handlers. Synchronous handlers implement the `Handler` interface, while asynchronous handlers implement the `AsyncHandler` interface. Both interfaces define methods for various events that can occur during task execution, including agent messages (and message deltas), user messages, tool calls, tool results, orchestrator sessions starting or stopping, and more.

ControlFlow includes a built-in `PrintHandler` that pretty-prints agent responses and tool calls to the terminal. It's used by default if `controlflow.settings.enable_default_print_handler=True` and no other handlers are provided.

### How handlers work

Whenever an event is generated by ControlFlow, the orchestrator will pass it to all of its registered handlers. Each handler will dispatch to one of its methods based on the type of event. For example, an `AgentMessage` event will be handled by the handler's `on_agent_message` method (or `on_agent_message_async` for async handlers). The `on_event` method is always called for every event. This table describes all event types and the methods they are dispatched to:

| Event Type | Method |
|------------|--------|
| `Event` (all events) | `on_event` |
| `UserMessage` | `on_user_message` |
| `OrchestratorMessage` | `on_orchestrator_message` |
| `AgentMessage` | `on_agent_message` |
| `AgentMessageDelta` | `on_agent_message_delta` |
| `ToolCall` | `on_tool_call` |
| `ToolResult` | `on_tool_result` |
| `OrchestratorStart` | `on_orchestrator_start` |
| `OrchestratorEnd` | `on_orchestrator_end` |
| `OrchestratorError` | `on_orchestrator_error` |
| `EndTurn` | `on_end_turn` |


### Writing a custom handler

To create a custom handler, subclass either the `Handler` class for synchronous handlers or the `AsyncHandler` class for asynchronous handlers. Implement the methods for the events you're interested in. Here are examples of both types:

#### Synchronous Handler

```python
import controlflow as cf
from controlflow.orchestration.handler import Handler
from controlflow.events.events import AgentMessage

class LoggingHandler(Handler):
    def on_agent_message(self, event: AgentMessage):
        print(f"Agent {event.agent.name} said: {event.ai_message.content}")

cf.run("Write a short poem about AI", handlers=[LoggingHandler()])
```

#### Asynchronous Handler

<VersionBadge version="0.11.1" />

```python
import asyncio
import controlflow as cf
from controlflow.orchestration.handler import AsyncHandler
from controlflow.events.events import AgentMessage

class AsyncLoggingHandler(AsyncHandler):
    async def on_agent_message(self, event: AgentMessage):
        await asyncio.sleep(0.1)  # Simulate some async operation
        print(f"Agent {event.agent.name} said: {event.ai_message.content}")

await cf.run_async("Write a short poem about AI", handlers=[AsyncLoggingHandler()])
```

When using asynchronous handlers, make sure to use the `run_async` function or other asynchronous methods in ControlFlow to properly handle the asynchronous events.

You can use both synchronous and asynchronous handlers together in the same async run. The orchestrator will automatically handle both types appropriately.
